package com.tpvlog.dfs.datanode.resgiter;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tpvlog.dfs.datanode.file.DataNodeInfo;
import com.tpvlog.dfs.datanode.file.ReplicateManager;
import com.tpvlog.dfs.datanode.file.StorageManager;
import com.tpvlog.dfs.datanode.client.NameNodeRpcClient;
import com.tpvlog.dfs.rpc.service.HeartbeatResponse;
import com.tpvlog.dfs.rpc.service.RegisterResponse;

import java.io.File;

/**
 * Data的服务注册和心跳管理组件
 *
 * @author Ressmix
 */
public class LeaseManager {

    public static final Integer STATUS_SUCCESS = 1;
    public static final Integer STATUS_FAILURE = 2;
    public static final Integer COMMAND_REGISTER = 1;
    public static final Integer COMMAND_REPORT_COMPLETE_STORAGE_INFO = 2;
    public static final Integer COMMAND_REPLICATE = 3;
    public static final Integer COMMAND_REMOVE_REPLICA = 4;

    private NameNodeRpcClient rpcClient;
    private StorageManager storageManager;
    private ReplicateManager replicateManager;

    public LeaseManager(NameNodeRpcClient rpcClient, StorageManager storageManager, ReplicateManager replicateManager) {
        this.rpcClient = rpcClient;
        this.storageManager = storageManager;
        this.replicateManager = replicateManager;
    }

    public Boolean register() {
        RegisterResponse response = rpcClient.register();
        if (response.getStatus() == STATUS_SUCCESS) {
            // 首次注册成功
            System.out.println("首次注册成功，需要全量上报存储信息......");
            DataNodeInfo dataNodeInfo = storageManager.getStorageInfo();
            rpcClient.fullyReportDataNodeInfo(dataNodeInfo);
            return true;
        } else {
            // 节点已经存在不能重复注册
            System.out.println("节点已注册，不需要全量上报存储信息......");
            return false;
        }
    }

    public void heartbeat() {
        new HeartbeatThread().start();
    }

/**
 * 负责心跳的线程
 */
private class HeartbeatThread extends Thread {
    @Override
    public void run() {
        while (true) {
            try {
                // 发送心跳
                HeartbeatResponse response = rpcClient.heartbeat();
                if (response.getStatus() == STATUS_SUCCESS) {
                    JSONArray commands = JSONArray.parseArray(response.getCommands());
                    if (commands.size() > 0) {
                        for (int i = 0; i < commands.size(); i++) {
                            JSONObject command = commands.getJSONObject(i);
                            Integer type = command.getInteger("type");
                            JSONObject task = command.getJSONObject("content");

                            if (type.equals(COMMAND_REPLICATE)) {
                                replicateManager.addReplicateTask(task);
                                System.out.println("接收副本复制任务，" + command);
                            } else if (type.equals(COMMAND_REMOVE_REPLICA)) {
                                System.out.println("接收副本删除任务，" + command);
                                String filename = task.getString("filename");
                                String absoluteFilename = DataNodeInfo.getAbsoluteFilename(filename);
                                File file = new File(absoluteFilename);
                                if (file.exists()) {
                                    file.delete();
                                }
                            }
                        }
                    }
                }else if(response.getStatus()==STATUS_FAILURE){
                    JSONArray commands = JSONArray.parseArray(response.getCommands());
                    for(int i = 0; i < commands.size(); i++) {
                        JSONObject command = commands.getJSONObject(i);
                        Integer type = command.getInteger("type");

                        // 如果是注册的命令
                        if(type.equals(COMMAND_REGISTER)) {
                            rpcClient.register();
                        }
                        // 如果是全量上报的命令
                        else if(type.equals(COMMAND_REPORT_COMPLETE_STORAGE_INFO)) {
                            DataNodeInfo storageInfo = storageManager.getStorageInfo();
                            rpcClient.fullyReportDataNodeInfo(storageInfo);
                        }
                    }
                }
                Thread.sleep(30 * 1000);
            } catch (Exception e) {
                System.out.println("当前NameNode不可用，心跳失败.......");
            }
        }
    }
}
}
